Background

This notebook demonstrates the code required to train and deploy two algorithms (linear regression and random forest) to an MLeap server.

The dataset used for the demo was pulled together from individual cities' data found here: http://insideairbnb.com/get-the-data.html

The sample code has the following sections:

  • Step 1: Load Data: Can be done from a flat file or from a S3 path
  • Step 2: Define Dependent and Independent (continuous and categorical) variables + Prep the data
  • Step 3: Train a linear regression and random forest model
  • Step 4: Convert the Spark Model -> MLeap Model
  • Step 5: Save the serialized models to file system
  • Step 6: Start MLeap Server and run sample requests against the models

In [2]:
// imports
import java.io.File
import com.esotericsoftware.kryo.io.Output
import com.truecar.mleap.serialization.ml.v1.MlJsonSerializer
import com.truecar.mleap.runtime.transformer.Transformer
import com.truecar.mleap.runtime.transformer
import com.truecar.mleap.spark.MleapSparkSupport._
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import ml.bundle.fs.DirectoryBundle
import com.truecar.mleap.runtime.util.LeapFrameUtil
import com.truecar.mleap.runtime.{LocalLeapFrame, LeapFrame}

import spray.json._
import com.truecar.mleap.serialization.mleap.v1.MleapJsonSupport._

Step 1: Load Data - Can be done from a flat file or from a S3 path


In [6]:
// Step 1. Load our Airbnb dataset

val inputFile = "file:////tmp/airbnb.avro"
val outputFileRf = "/tmp/transformer.rf.ml"
val outputFileLr = "/tmp/transformer.lr.ml"

var dataset = sqlContext.read.format("com.databricks.spark.avro").
  load(inputFile)

var datasetFiltered = dataset.filter("price >= 50 AND price <= 750 and bathrooms > 0.0")
println(dataset.count())
println(datasetFiltered.count())


389255
321588

Summary Statistics


In [7]:
datasetFiltered.select("price", "bedrooms", "bathrooms", "number_of_reviews", "cleaning_fee").describe().show()


+-------+------------------+------------------+-------------------+------------------+-----------------+
|summary|             price|          bedrooms|          bathrooms| number_of_reviews|     cleaning_fee|
+-------+------------------+------------------+-------------------+------------------+-----------------+
|  count|            321588|            321588|             321588|            321588|           321588|
|   mean|131.54961006007687|1.3352426085550455|  1.199068373198005|17.920662462529695|37.64188340360959|
| stddev| 90.10912788720125| 0.846658660106074|0.48305900512627564|27.985814829081626|42.64237791484594|
|    min|              50.0|               0.0|                0.5|                 1|              0.0|
|    max|             750.0|              10.0|                8.0|               735|            700.0|
+-------+------------------+------------------+-------------------+------------------+-----------------+


In [8]:
// Most popular cities (original dataset)
dataset.registerTempTable("df")

sqlContext.sql(f"""
    select 
        city,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by city
    order by count(*) desc
""").show()


+-------------+-----+---------+---------+
|         city|    n|avg_price|max_price|
+-------------+-----+---------+---------+
|        Paris|49341|    98.95|   1966.0|
|       London|30410|    98.47|   1650.0|
|     New York|27881|   168.17|   5000.0|
|       Berlin|23866|    60.37|    650.0|
|    Barcelona|21723|    73.16|   1100.0|
|     Brooklyn|20055|   117.23|   4500.0|
|  Los Angeles|18198|   134.33|  10000.0|
|    Amsterdam|17736|   129.60|   1900.0|
|San Francisco|11214|   200.74|  10000.0|
|       Madrid|11179|    63.81|   3005.0|
|      Toronto| 9154|   121.32|   2550.0|
|      Chicago| 8208|   137.88|   2000.0|
|       Austin| 7499|   213.07|   2549.0|
|     MontréAl| 7403|    87.29|   2501.0|
|    Vancouver| 6975|   124.98|   1513.0|
|      Seattle| 6331|   127.14|   1000.0|
|   Washington| 5576|   133.84|   1200.0|
|     Portland| 4936|   105.08|    700.0|
|     Montreal| 4797|    94.74|   1300.0|
|    San Diego| 4711|   170.92|   1725.0|
+-------------+-----+---------+---------+
only showing top 20 rows


In [9]:
// Most expensive popular cities (original dataset)
dataset.registerTempTable("df")

sqlContext.sql(f"""
    select 
        city,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by city
    order by avg(price) desc
""").filter("n>25").show()


+--------------------+---+---------+---------+
|                city|  n|avg_price|max_price|
+--------------------+---+---------+---------+
|          Palm Beach| 68|   491.28|   1500.0|
|              Malibu|337|   377.53|   4500.0|
|   Pacific Palisades| 36|   326.00|    850.0|
|         Watsonville| 80|   319.70|    782.0|
|       Darling Point| 65|   309.03|   2001.0|
|       Bilgola Beach| 32|   300.44|    890.0|
|        Avalon Beach| 88|   278.93|   1000.0|
|              Avalon| 82|   270.15|    850.0|
|             Del Mar| 40|   266.20|    900.0|
|            Tamarama|153|   258.26|   1000.0|
|       Playa Del Rey| 34|   255.76|    599.0|
|            La Jolla|124|   254.70|   2400.0|
| Rancho Palos Verdes| 85|   253.44|   1250.0|
|     Manhattan Beach|249|   252.19|   1000.0|
|La CañAda Flintridge| 32|   250.88|    900.0|
| Sydney Olympic Park| 40|   250.55|    520.0|
|              Mosman|239|   246.82|   3701.0|
|            Capitola| 72|   246.50|    650.0|
|          Birchgrove| 35|   240.17|   1000.0|
|             Newport|120|   237.67|    901.0|
+--------------------+---+---------+---------+
only showing top 20 rows

Step 2: Define Dependent and Independent (continuous and categorical) variables + Prep the Data


In [10]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("bathrooms",
  "bedrooms",
  "security_deposit",
  "cleaning_fee",
  "extra_people",
  "number_of_reviews",
  "review_scores_rating")

val categoricalFeatures = Array("room_type",
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable")

val allFeatures = continuousFeatures.union(categoricalFeatures)

In [11]:
// Filter all null values
val allCols = allFeatures.union(Seq("price")).map(datasetFiltered.col)
val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
datasetFiltered = datasetFiltered.select(allCols: _*).filter(nullFilter).persist()

In [12]:
val Array(trainingDataset, validationDataset) = datasetFiltered.randomSplit(Array(0.7, 0.3))

val continuousFeatureAssembler = new VectorAssembler().
    setInputCols(continuousFeatures).
    setOutputCol("unscaled_continuous_features")
val continuousFeatureScaler = new StandardScaler().
    setInputCol("unscaled_continuous_features").
    setOutputCol("scaled_continuous_features")

val categoricalFeatureIndexers = categoricalFeatures.map {
    feature => new StringIndexer().
      setInputCol(feature).
      setOutputCol(s"${feature}_index")
}

val featureCols = categoricalFeatureIndexers.map(_.getOutputCol).union(Seq("scaled_continuous_features"))
val featureAssembler = new VectorAssembler().
    setInputCols(featureCols).
    setOutputCol("features")
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler).
    union(categoricalFeatureIndexers).
    union(Seq(featureAssembler))
val featurePipeline = new Pipeline().
    setStages(estimators)
val sparkFeaturePipelineModel = featurePipeline.fit(datasetFiltered)

println("Finished constructing the pipeline")


Finished constructing the pipeline

Step 3: Train a linear regression and random forest model


In [13]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestRegressor().
    setFeaturesCol("features").
    setLabelCol("price").
    setPredictionCol("price_prediction")

val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))
val sparkPipelineRf = sparkPipelineEstimatorRf.fit(trainingDataset)

println("Complete: Training Random Forest")


Complete: Training Random Forest

In [14]:
// Step 3.2 Create our linear regression model
val linearRegression = new LinearRegression().
    setFeaturesCol("features").
    setLabelCol("price").
    setPredictionCol("price_prediction")

val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, linearRegression))
val sparkPipelineLr = sparkPipelineEstimatorLr.fit(trainingDataset)

println("Complete: Training Linear Regression")


Complete: Training Linear Regression

In [15]:
// Step 4.1 Assemble the final pipeline (random forest) by implicit conversion to MLeap models
val mleapPipelineRf: transformer.PipelineModel = mleapPipelineModelToMleap.toMleap(sparkPipelineRf)
val mleapRandomForest = mleapPipelineRf.
  transformers(1).
  asInstanceOf[transformer.RandomForestRegressionModel].
  copy(predictionCol = "price_prediction_mleap")

In [16]:
// Step 4.2 Assemble the final pipeline (linear regression) by implicit conversion to MLeap models
val mleapPipelineLr: transformer.PipelineModel = mleapPipelineModelToMleap.toMleap(sparkPipelineLr)
val mleapLinearRegression = mleapPipelineLr.
  transformers(1).
  asInstanceOf[transformer.LinearRegressionModel].
  copy(predictionCol = "price_prediction_mleap")

In [17]:
var scoredRf = sparkPipelineRf.transform(validationDataset)
scoredRf = mleapRandomForest.sparkTransform(scoredRf)

scoredRf.select("bathrooms", "bedrooms", "security_deposit", "number_of_reviews", "price", "price_prediction", "price_prediction_mleap").
 //where("bedrooms>0 and bathrooms>0").
  limit(10).
  show()


+---------+--------+----------------+-----------------+-----+------------------+----------------------+
|bathrooms|bedrooms|security_deposit|number_of_reviews|price|  price_prediction|price_prediction_mleap|
+---------+--------+----------------+-----------------+-----+------------------+----------------------+
|      1.0|     1.0|           100.0|                8| 80.0| 97.43475705708038|     97.43475705708038|
|      1.5|     1.0|           300.0|               24|200.0| 87.99622602478738|     87.99622602478738|
|      1.0|     1.0|             0.0|                3| 75.0| 84.62038489504812|     84.62038489504812|
|      1.0|     1.0|           100.0|                8| 80.0| 97.43475705708038|     97.43475705708038|
|      1.0|     1.0|             0.0|                3| 75.0| 84.62038489504812|     84.62038489504812|
|      1.0|     1.0|           100.0|                3| 70.0| 86.58945594264569|     86.58945594264569|
|      1.0|     1.0|           500.0|                1| 80.0| 99.62425620767974|     99.62425620767974|
|      1.0|     1.0|           200.0|               57|165.0|120.41088779835043|    120.41088779835043|
|      1.0|     1.0|           150.0|                3| 63.0| 83.89706790035493|     83.89706790035493|
|      1.0|     1.0|             0.0|                5| 99.0| 86.22376341232868|     86.22376341232868|
+---------+--------+----------------+-----------------+-----+------------------+----------------------+


In [18]:
var scoredLr = sparkPipelineLr.transform(validationDataset)
scoredLr = mleapLinearRegression.sparkTransform(scoredLr)

scoredLr.select("bathrooms", "bedrooms", "security_deposit", "number_of_reviews", "price", "price_prediction", "price_prediction_mleap").
  where("bedrooms>0 and bathrooms>0").
  limit(10).
  show()


+---------+--------+----------------+-----------------+-----+------------------+----------------------+
|bathrooms|bedrooms|security_deposit|number_of_reviews|price|  price_prediction|price_prediction_mleap|
+---------+--------+----------------+-----------------+-----+------------------+----------------------+
|      1.0|     1.0|           100.0|                8| 80.0|  112.149570389774|      112.149570389774|
|      1.5|     1.0|           300.0|               24|200.0|  86.8904396635898|      86.8904396635898|
|      1.0|     1.0|             0.0|                3| 75.0| 71.31985522633005|     71.31985522633005|
|      1.0|     1.0|           100.0|                8| 80.0|  112.149570389774|      112.149570389774|
|      1.0|     1.0|             0.0|                3| 75.0| 71.31985522633005|     71.31985522633005|
|      1.0|     1.0|           100.0|                3| 70.0| 91.63094177732897|     91.63094177732897|
|      1.0|     1.0|           500.0|                1| 80.0| 93.33458091499398|     93.33458091499398|
|      1.0|     1.0|           200.0|               57|165.0|136.62462299596334|    136.62462299596334|
|      1.0|     1.0|           150.0|                3| 63.0| 86.69003576341353|     86.69003576341353|
|      1.0|     1.0|             0.0|                5| 99.0| 76.41925534848974|     76.41925534848974|
+---------+--------+----------------+-----------------+-----+------------------+----------------------+

Step 5: Save the serialized models to file system


In [19]:
// Step 7. Save our MLeap pipeline to a directory
val mleapFileRf = new File(outputFileRf)
val mleapFileLr = new File(outputFileLr)

// if you want to save to S3
// val bundleWriter = S3BundleWriter(s3Path)
val bundleWriterRf = DirectoryBundle(mleapFileRf)
val bundleWriterLr = DirectoryBundle(mleapFileLr)

mleapFileRf.mkdirs()
mleapFileLr.mkdirs()

val serializer = MlJsonSerializer

serializer.serializeWithClass(mleapPipelineRf, bundleWriterRf)
serializer.serializeWithClass(mleapPipelineLr, bundleWriterLr)


Out[19]:
Bundle(BundleInfo(ml.bundle.v1.runtime.PipelineModel,0.1.1,false),None,PipelineModel(ArraySeq(PipelineModel(ArraySeq(VectorAssemblerModel([Ljava.lang.String;@29eaf2e3,unscaled_continuous_features), StandardScalerModel(unscaled_continuous_features,scaled_continuous_features,StandardScaler(Some(DenseVector([D@9772c5a)),None)), StringIndexerModel(room_type,room_type_index,StringIndexer(WrappedArray(Entire home/apt, Private room, Shared room))), StringIndexerModel(host_is_superhost,host_is_superhost_index,StringIndexer(WrappedArray(0.0, 1.0))), StringIndexerModel(cancellation_policy,cancellation_policy_index,StringIndexer(WrappedArray(strict, moderate, flexible, super_strict_30, super_strict_60, no_refunds, long_term))), StringIndexerModel(instant_bookable,instant_...

In [ ]:
// curl -v -XPOST \                                                                                                                                         Workspace/mleap (develop) Hollins-MacBook-Pro
// -H "content-type: application/json" \
// -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8080/transform


// sbt "server/run /tmp/transformer.rf.ml 8080"
// sbt "server/run /tmp/transformer.lr.ml 8081"
// curl -v -XPOST \                                                                                                                                                                 ~ Hollins-MacBook-Pro
//   -H "content-type: application/json" \
//   -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8080/transform
// curl -v -XPOST \                                                                                                                                                                 ~ Hollins-MacBook-Pro
//   -H "content-type: application/json" \
//   -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8081/transform

In [ ]:
/*
{
  "schema": {
    "fields": [{
      "name": "bathrooms",
      "dataType": "double"
    }, {
      "name": "bedrooms",
      "dataType": "double"
    }, {
      "name": "security_deposit",
      "dataType": "double"
    }, {
      "name": "cleaning_fee",
      "dataType": "double"
    }, {
      "name": "extra_people",
      "dataType": "double"
    }, {
      "name": "number_of_reviews",
      "dataType": "double"
    }, {
      "name": "review_scores_rating",
      "dataType": "double"
    }, {
      "name": "room_type",
      "dataType": "string"
    }, {
      "name": "host_is_superhost",
      "dataType": "string"
    }, {
      "name": "cancellation_policy",
      "dataType": "string"
    }, {
      "name": "instant_bookable",
      "dataType": "string"
    }]
  },
  "rows": [[2.0, 3.0, 50.0, 30.0, 2.0, 56.0, 90.0, "Entire home/apt", "1.0", "strict", "1.0"]]
}
*/